package com.it.windows;

import com.it.operator.utils.SourceUtils;
import com.it.pojo.Event;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import java.sql.Timestamp;
import java.time.Duration;
import java.util.HashSet;

/**
 * @author code1997
 */
public class UrlVisitCount {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        SingleOutputStreamOperator<Event> eventSource = SourceUtils.getEventSource(executionEnvironment)
                //调用flink内置的针对于有序流的watermark策略
                .assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO).withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                    @Override
                    public long extractTimestamp(Event element, long recordTimestamp) {
                        return element.timestamp;
                    }
                }));
        eventSource.keyBy(event -> event.url)
                //滑动事件时间窗口
                //.window(SlidingEventTimeWindows.of(Time.hours(1)))
                //滚动事件时间窗口：默认是整点，可以通过offset来进行调整
                .window(TumblingEventTimeWindows.of(Time.seconds(2L)))
                //聚合函数:
                .aggregate(new WindowAggAndProcessTest.MyAggFunction(),
                        new WindowAggAndProcessTest.MyProcessFunction()
                ).print();
        executionEnvironment.execute();
    }

    static class MyAggFunction implements AggregateFunction<Event, HashSet<String>, Long> {

        @Override
        public HashSet<String> createAccumulator() {
            return new HashSet<>();
        }

        @Override
        public HashSet<String> add(Event value, HashSet<String> accumulator) {
            accumulator.add(value.user);
            return accumulator;
        }

        @Override
        public Long getResult(HashSet<String> accumulator) {
            return (long) accumulator.size();
        }

        @Override
        public HashSet<String> merge(HashSet<String> a, HashSet<String> b) {
            return null;
        }
    }

    static class MyProcessFunction extends ProcessWindowFunction<Long, String, Boolean, TimeWindow> {


        @Override
        public void process(Boolean aBoolean, ProcessWindowFunction<Long, String, Boolean, TimeWindow>.Context context, Iterable<Long> elements, Collector<String> out) throws Exception {
            //结合窗口信息输出
            Long uv = elements.iterator().next();
            long start = context.window().getStart();
            long end = context.window().getEnd();
            out.collect("窗口 " + new Timestamp(start) + "~" + new Timestamp(end) + " 的uv值为：" + uv);
        }
    }
}
